Skip to content
Snippets Groups Projects
Commit bb01531d authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

Reorganize logging facilities

Move tarantool+slog related stuff to a separate module `tlog`.
Introduce the `tlog!` macro for easier logging.
parent 74c8a370
No related branches found
No related tags found
1 merge request!13Reorganize logging facilities
Pipeline #3403 passed
use ::tarantool::tlua;
use rmp_serde;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, o};
use std::os::raw::c_int;
mod tarantool;
mod tlog;
mod traft;
pub struct InnerTest {
......@@ -139,17 +139,13 @@ fn main_run(stash: &Rc<RefCell<Stash>>) {
tarantool::set_cfg(&cfg);
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
info!(logger, "Hello, Rust!"; "module" => std::module_path!());
debug!(
logger,
tlog!(Info, "Hello, Rust!"; "module" => std::module_path!());
tlog!(
Debug,
"Picodata running on {} {}",
tarantool::package(),
tarantool::version()
);
// raft_main();
}
fn get_stash(stash: &Rc<RefCell<Stash>>) {
......@@ -186,25 +182,24 @@ impl From<RaftEntryData> for Vec<u8> {
fn raft_propose(stash: &Rc<RefCell<Stash>>, entry_data: RaftEntryData) {
let mut stash: RefMut<Stash> = stash.borrow_mut();
let raft_node = stash.raft_node.as_mut().unwrap();
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
let data: Vec<u8> = entry_data.into();
info!(
logger,
tlog!(
Info,
"propose binary data ({} bytes).......................................",
data.len()
);
raft_node.borrow_mut().propose(vec![], data).unwrap();
info!(logger, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
tlog!(Info, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
}
fn handle_committed_data(logger: &slog::Logger, data: &[u8]) {
fn handle_committed_data(data: &[u8]) {
use RaftEntryData::*;
match data.try_into() {
Ok(x) => match x {
EvalLua(code) => crate::tarantool::eval(&code),
Info(msg) => info!(logger, "{}", msg),
Info(msg) => tlog!(Info, "{}", msg),
},
Err(why) => error!(logger, "cannot decode raft entry data: {}", why),
Err(why) => tlog!(Error, "cannot decode raft entry data: {}", why),
}
}
......@@ -78,58 +78,3 @@ pub fn eval(code: &str) {
let f = LuaFunction::load(l, code).unwrap();
f.call().unwrap()
}
pub use self::slog::Drain as SlogDrain;
mod slog {
pub struct Drain;
impl slog::Drain for Drain {
type Ok = ();
type Err = slog::Never;
fn log(
&self,
record: &slog::Record,
values: &slog::OwnedKVList,
) -> Result<Self::Ok, Self::Err> {
use ::tarantool::log::say;
use ::tarantool::log::SayLevel;
// Max level is constant = trace
// It's hardcoded in Cargo.toml dependency features
// In runtime it's managed by tarantool box.cfg.log_level
let lvl = match record.level() {
slog::Level::Critical => SayLevel::Crit,
slog::Level::Error => SayLevel::Error,
slog::Level::Warning => SayLevel::Warn,
slog::Level::Info => SayLevel::Info,
slog::Level::Debug => SayLevel::Verbose,
slog::Level::Trace => SayLevel::Debug,
};
let mut s = StrSerializer {
str: format!("{}", record.msg()),
};
use slog::KV;
// It's safe to use .unwrap() here since
// StrSerializer doesn't return anything but Ok()
record.kv().serialize(record, &mut s).unwrap();
values.serialize(record, &mut s).unwrap();
say(lvl, record.file(), record.line() as i32, None, &s.str);
Ok(())
}
}
struct StrSerializer {
pub str: String,
}
impl slog::Serializer for StrSerializer {
fn emit_arguments(&mut self, key: slog::Key, val: &std::fmt::Arguments) -> slog::Result {
self.str.push_str(&format!(", {}: {}", key, val));
Ok(())
}
}
}
pub struct Drain;
pub fn root() -> slog::Logger {
slog::Logger::root(Drain, slog::o!())
}
#[macro_export]
macro_rules! tlog {
($lvl:ident, $($args:tt)*) => {{
let logger = $crate::tlog::root();
slog::slog_log!(logger, slog::Level::$lvl, "", $($args)*);
}}
}
impl slog::Drain for Drain {
type Ok = ();
type Err = slog::Never;
fn log(
&self,
record: &slog::Record,
values: &slog::OwnedKVList,
) -> Result<Self::Ok, Self::Err> {
use ::tarantool::log::say;
use ::tarantool::log::SayLevel;
// Max level is constant = trace
// It's hardcoded in Cargo.toml dependency features
// In runtime it's managed by tarantool box.cfg.log_level
let lvl = match record.level() {
slog::Level::Critical => SayLevel::Crit,
slog::Level::Error => SayLevel::Error,
slog::Level::Warning => SayLevel::Warn,
slog::Level::Info => SayLevel::Info,
slog::Level::Debug => SayLevel::Verbose,
slog::Level::Trace => SayLevel::Debug,
};
let mut s = StrSerializer {
str: format!("{}", record.msg()),
};
use slog::KV;
// It's safe to use .unwrap() here since
// StrSerializer doesn't return anything but Ok()
record.kv().serialize(record, &mut s).unwrap();
values.serialize(record, &mut s).unwrap();
say(lvl, record.file(), record.line() as i32, None, &s.str);
Ok(())
}
}
struct StrSerializer {
pub str: String,
}
impl slog::Serializer for StrSerializer {
fn emit_arguments(&mut self, key: slog::Key, val: &std::fmt::Arguments) -> slog::Result {
self.str.push_str(&format!(", {}: {}", key, val));
Ok(())
}
}
use raft::prelude::*;
use raft::Error as RaftError;
use slog::info;
use std::ops::{Deref, DerefMut};
use std::cell::RefCell;
......@@ -9,35 +8,32 @@ use std::rc::Rc;
use std::time::Duration;
use super::storage::Storage;
use crate::tarantool::SlogDrain;
use crate::tlog;
use ::tarantool::fiber;
// pub type Node = RawNode<Storage>;
type RawNode = raft::RawNode<Storage>;
pub struct Node {
logger: slog::Logger,
raw_node: Rc<RefCell<RawNode>>,
main_loop: Option<fiber::LuaUnitJoinHandle>,
}
impl Node {
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let logger = slog::Logger::root(SlogDrain, slog::o!());
let logger = tlog::root();
let raw_node = RawNode::new(cfg, Storage, &logger)?;
let raw_node = Rc::from(RefCell::from(raw_node));
let ret = Node {
logger,
raw_node,
main_loop: None,
};
Ok(ret)
}
pub fn start(&mut self, handle_committed_data: fn(&slog::Logger, &[u8])) {
pub fn start(&mut self, handle_committed_data: fn(&[u8])) {
assert!(self.main_loop.is_none(), "Raft loop is already started");
let logger = self.logger.clone();
let raw_node = self.raw_node.clone();
let loop_fn = move || {
loop {
......@@ -46,7 +42,7 @@ impl Node {
// let mut raft_node = stash.raft_node.as_mut().unwrap();
let mut raw_node = raw_node.borrow_mut();
raw_node.tick();
on_ready(&mut raw_node, &logger, handle_committed_data);
on_ready(&mut raw_node, handle_committed_data);
}
};
......@@ -68,24 +64,20 @@ impl DerefMut for Node {
}
}
fn on_ready(
raft_group: &mut RawNode,
logger: &slog::Logger,
handle_committed_data: fn(&slog::Logger, &[u8]),
) {
fn on_ready(raft_group: &mut RawNode, handle_committed_data: fn(&[u8])) {
if !raft_group.has_ready() {
return;
}
info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
tlog!(Info, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
// Get the `Ready` with `RawNode::ready` interface.
let mut ready: raft::Ready = raft_group.ready();
info!(logger, "--- {:?}", ready);
tlog!(Info, "--- {:?}", ready);
let handle_messages = |msgs: Vec<Message>| {
for _msg in msgs {
info!(logger, "--- handle message: {:?}", _msg);
tlog!(Info, "--- handle message: {:?}", _msg);
// Send messages to other peers.
}
};
......@@ -98,14 +90,14 @@ fn on_ready(
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
let snap = ready.snapshot().clone();
info!(logger, "--- apply_snapshot: {:?}", snap);
tlog!(Info, "--- apply_snapshot: {:?}", snap);
unimplemented!();
// store.wl().apply_snapshot(snap).unwrap();
}
let handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
info!(logger, "--- committed_entry: {:?}", entry);
tlog!(Info, "--- committed_entry: {:?}", entry);
Storage::persist_applied(entry.index);
if entry.data.is_empty() {
......@@ -114,7 +106,7 @@ fn on_ready(
}
if entry.get_entry_type() == EntryType::EntryNormal {
handle_committed_data(logger, entry.get_data())
handle_committed_data(entry.get_data())
}
// TODO: handle EntryConfChange
......@@ -126,7 +118,7 @@ fn on_ready(
// Append entries to the Raft log.
let entries = ready.entries();
for entry in entries {
info!(logger, "--- uncommitted_entry: {:?}", entry);
tlog!(Info, "--- uncommitted_entry: {:?}", entry);
}
Storage::persist_entries(entries);
......@@ -135,7 +127,7 @@ fn on_ready(
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
info!(logger, "--- hard_state: {:?}", hs);
tlog!(Info, "--- hard_state: {:?}", hs);
Storage::persist_hard_state(&hs);
// store.wl().set_hardstate(hs);
}
......@@ -145,11 +137,11 @@ fn on_ready(
handle_messages(ready.take_persisted_messages());
}
info!(logger, "ADVANCE -----------------------------------------");
tlog!(Info, "ADVANCE -----------------------------------------");
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
info!(logger, "--- {:?}", light_rd);
tlog!(Info, "--- {:?}", light_rd);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit);
......@@ -160,5 +152,5 @@ fn on_ready(
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
tlog!(Info, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
}
......@@ -4,8 +4,8 @@ use ::tarantool::tuple::Tuple;
use raft::eraftpb::ConfState;
use raft::StorageError;
use serde::{Deserialize, Serialize};
use slog::{debug, o};
use crate::tlog;
use raft::prelude::*;
use raft::Error as RaftError;
......@@ -158,8 +158,7 @@ impl raft::Storage for Storage {
};
let ret: RaftState = RaftState::new(hs, cs);
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ initial_state() -> {:?}", ret);
tlog!(Debug, "+++ initial_state() -> {:?}", ret);
Ok(ret)
}
......@@ -182,12 +181,11 @@ impl raft::Storage for Storage {
let tuple = space.primary_key().get(&(idx,)).unwrap();
let row: Option<LogRow> = tuple.and_then(|t| t.into_struct().unwrap());
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
if let Some(row) = row {
debug!(logger, "+++ term(idx={}) -> {:?}", idx, row.raft_term);
tlog!(Debug, "+++ term(idx={}) -> {:?}", idx, row.raft_term);
return Ok(row.raft_term);
} else {
debug!(logger, "+++ term(idx={}) -> Unavailable", idx);
tlog!(Debug, "+++ term(idx={}) -> Unavailable", idx);
return Err(RaftError::Store(StorageError::Unavailable));
}
}
......@@ -202,16 +200,14 @@ impl raft::Storage for Storage {
let row: Option<LogRow> = tuple.and_then(|t| t.into_struct().unwrap());
let ret: u64 = row.map(|row| row.raft_index).unwrap_or(0);
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ last_index() -> {:?}", ret);
Ok(ret)
}
fn snapshot(&self, request_index: u64) -> Result<Snapshot, RaftError> {
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(
logger,
"+++ snapshot(idx={}) -> unimplemented", request_index
tlog!(
Critical,
"+++ snapshot(idx={}) -> unimplemented",
request_index
);
unimplemented!();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment